/*
 * Copyright © 2022 Apple Inc. and the ServiceTalk project authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.servicetalk.http.api;

import io.servicetalk.concurrent.api.Single;
import io.servicetalk.serializer.api.SerializationException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.RejectedExecutionException;

import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH;
import static io.servicetalk.http.api.HttpHeaderValues.ZERO;
import static io.servicetalk.http.api.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.servicetalk.http.api.HttpResponseStatus.PAYLOAD_TOO_LARGE;
import static io.servicetalk.http.api.HttpResponseStatus.SERVICE_UNAVAILABLE;
import static io.servicetalk.http.api.HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE;

/**
 * Filter that maps known {@link Exception} subtypes into an HTTP response with an appropriate
 * {@link HttpResponseStatus}.
 * <p>
 * This filter is recommended to be placed as early as possible to make sure it captures all exceptions that may be
 * generated by other filters.
 */
public final class HttpExceptionMapperServiceFilter implements StreamingHttpServiceFilterFactory {

    /**
     * Instance of {@link HttpExceptionMapperServiceFilter}.
     */
    public static final StreamingHttpServiceFilterFactory INSTANCE = new HttpExceptionMapperServiceFilter();

    private static final Logger LOGGER = LoggerFactory.getLogger(HttpExceptionMapperServiceFilter.class);

    private HttpExceptionMapperServiceFilter() {
        // Singleton
    }

    @Override
    public StreamingHttpServiceFilter create(final StreamingHttpService service) {
        return new StreamingHttpServiceFilter(service) {
            @Override
            public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
                                                        final StreamingHttpRequest request,
                                                        final StreamingHttpResponseFactory responseFactory) {
                Single<StreamingHttpResponse> respSingle;
                try {
                    respSingle = delegate().handle(ctx, request, responseFactory);
                } catch (Throwable cause) {
                    respSingle = failed(cause);
                }
                return respSingle.onErrorReturn(cause -> newErrorResponse(cause, ctx, request, responseFactory));
            }
        };
    }

    private static StreamingHttpResponse newErrorResponse(final Throwable cause,
                                                          final HttpServiceContext ctx,
                                                          final StreamingHttpRequest request,
                                                          final StreamingHttpResponseFactory responseFactory) {
        final HttpResponseStatus status;
        if (cause instanceof RejectedExecutionException) {
            status = SERVICE_UNAVAILABLE;
            LOGGER.error("Task rejected by service processing for connection='{}', request='{} {} {}'. Returning: {}",
                    ctx, request.method(), request.requestTarget(), request.version(), status, cause);
        } else if (cause instanceof SerializationException) {
            // It is assumed that a failure occurred when attempting to deserialize the request.
            status = UNSUPPORTED_MEDIA_TYPE;
            LOGGER.error("Failed to deserialize or serialize for connection='{}', request='{} {} {}'. Returning: {}",
                    ctx, request.method(), request.requestTarget(), request.version(), status, cause);
        } else if (cause instanceof PayloadTooLargeException) {
            status = PAYLOAD_TOO_LARGE;
        } else {
            status = INTERNAL_SERVER_ERROR;
            LOGGER.error("Unexpected exception during service processing for connection='{}', request='{} {} {}'. " +
                            "Trying to return: {}", ctx, request.method(), request.requestTarget(), request.version(),
                    status, cause);
        }
        return responseFactory.newResponse(status).setHeader(CONTENT_LENGTH, ZERO);
    }

    @Override
    public HttpExecutionStrategy requiredOffloads() {
        return HttpExecutionStrategies.offloadNone();    // no influence since we do not block
    }
}
